{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Consuming Messages from Kafka  Tour Producer Using Scala Spark\n",
    "\n",
    "To run this notebook you should have taken the Kafka tour and created the Producer and Consumer jobs. I.e your Job UI should look like this: \n",
    "\n",
    "![kafka11.png](./images/kafka11.png)\n",
    "\n",
    "In this notebook we will consume messages from Kafka that were produced by the producer-job created in the Demo. Go to the Jobs-UI in hopsworks and start the Kafka producer job:\n",
    "\n",
    "![kafka12.png](./images/kafka12.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>0</td><td>application_1538645926086_0001</td><td>spark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0:8088/proxy/application_1538645926086_0001/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0:8042/node/containerlogs/container_e01_1538645926086_0001_01_000001/KafkaPython__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n",
      "import org.apache.kafka.clients.consumer.ConsumerRecord\n",
      "import org.apache.kafka.common.serialization.StringDeserializer\n",
      "import org.apache.kafka.common.serialization.StringSerializer\n",
      "import org.apache.spark.streaming.kafka010._\n",
      "import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent\n",
      "import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe\n",
      "import io.hops.util.Hops\n",
      "import org.apache.spark._\n",
      "import org.apache.spark.streaming._\n"
     ]
    }
   ],
   "source": [
    "import org.apache.kafka.clients.consumer.ConsumerRecord\n",
    "import org.apache.kafka.common.serialization.StringDeserializer\n",
    "import org.apache.kafka.common.serialization.StringSerializer\n",
    "import org.apache.spark.streaming.kafka010._\n",
    "import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent\n",
    "import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe\n",
    "import io.hops.util.Hops\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.streaming._"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Constants\n",
    "\n",
    "<span style=\"color:red\">Update</span> the `TOPIC_NAME` field to reflect the name of your Kafka topic that was created in your Kafka tour (e.g \"DemoKafkaTopic_3\")\n",
    "\n",
    "Update the `OUTPUT_PATH` field to where the output data should be written"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "topicName: String = test2\n",
      "outputPath: String = /Projects/KafkaPython/Resources/data2-txt\n",
      "checkpointPath: String = /Projects/KafkaPython/Resources/checkpoint2-txt\n"
     ]
    }
   ],
   "source": [
    "val topicName = \"test2\"\n",
    "val outputPath = \"/Projects/\" + Hops.getProjectName() + \"/Resources/data2-txt\"\n",
    "val checkpointPath = \"/Projects/\" + Hops.getProjectName() + \"/Resources/checkpoint2-txt\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Consume the Kafka Topic using Spark and Write to a Sink\n",
    "\n",
    "The below snippet creates a streaming DataFrame with Kafka as a data source. Spark is lazy so it will not start streaming the data from Kafka into the dataframe until we specify an output sink (which we do later on in this notebook)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]\n"
     ]
    }
   ],
   "source": [
    "val df = spark.readStream.format(\"kafka\").\n",
    "      option(\"kafka.bootstrap.servers\", Hops.getBrokerEndpoints()).\n",
    "      option(\"kafka.security.protocol\",\"SSL\").\n",
    "      option(\"kafka.ssl.truststore.location\",Hops.getTrustStore()).\n",
    "      option(\"kafka.ssl.truststore.password\", Hops.getKeystorePwd().filterNot(_.toInt < 32).filterNot(_.toInt == 64)).\n",
    "      option(\"kafka.ssl.keystore.location\",Hops.getKeyStore()).\n",
    "      option(\"kafka.ssl.keystore.password\",Hops.getKeystorePwd().filterNot(_.toInt < 32).filterNot(_.toInt == 64)).\n",
    "      option(\"kafka.ssl.key.password\",Hops.getKeystorePwd().filterNot(_.toInt < 32).filterNot(_.toInt == 64)).\n",
    "      option(\"kafka.ssl.endpoint.identification.algorithm\",\"\").\n",
    "      option(\"subscribe\", topicName).load();"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When using Kafka as a data source, Spark gives us a default kafka schema as printed below"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- key: binary (nullable = true)\n",
      " |-- value: binary (nullable = true)\n",
      " |-- topic: string (nullable = true)\n",
      " |-- partition: integer (nullable = true)\n",
      " |-- offset: long (nullable = true)\n",
      " |-- timestamp: timestamp (nullable = true)\n",
      " |-- timestampType: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We are using the Spark structured streaming engine, which means that we can express stream queries just as we would do in batch jobs. \n",
    "\n",
    "Below we filter the input stream to select only the message values"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "messages: org.apache.spark.sql.DataFrame = [value: string]\n"
     ]
    }
   ],
   "source": [
    "val messages = df.selectExpr(\"CAST(value AS STRING)\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Specify the output query and the sink of the stream job to be a CSV file in HopsFS. \n",
    "\n",
    "By using checkpointing and a WAL, spark gives us end-to-end exactly-once fault-tolerance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19e5278f\n"
     ]
    }
   ],
   "source": [
    "val query = messages.\n",
    "        writeStream.\n",
    "        format(\"text\").\n",
    "        option(\"path\", outputPath).\n",
    "        option(\"checkpointLocation\", checkpointPath).\n",
    "        start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the streaming job, in theory streaming jobs should run forever. \n",
    "\n",
    "The call below will be blocking and not terminate. To kill this job you have to restart the pyspark kernel."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query.awaitTermination()\n",
    "query.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "While the job is running you can go to the HDFS file browser in the Hopsworks UI to preview the files:\n",
    "\n",
    "![kafka14.png](./images/kafka14.png)\n",
    "![kafka13.png](./images/kafka13.png)\n",
    "![kafka15.png](./images/kafka15.png)\n",
    "![kafka16.png](./images/kafka16.png)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Spark",
   "language": "",
   "name": "sparkkernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
